1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
   |    override def createSchedulerBackend(   sc: SparkContext,   masterURL: String,   scheduler: TaskScheduler): SchedulerBackend = {     ...          val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(       apiServerUri,       Some(sc.conf.get(KUBERNETES_NAMESPACE)),       authConfPrefix,       SparkKubernetesClientFactory.ClientType.Driver,       sc.conf,       defaultServiceAccountCaCrt)            ...               val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore)          val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(       snapshotsStore,       kubernetesClient,       sc.conf)          val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(       "kubernetes-executor-pod-polling-sync")     val podsPollingEventSource = new ExecutorPodsPollingSnapshotSource(       sc.conf, kubernetesClient, snapshotsStore, eventsPollingExecutor)          new KubernetesClusterSchedulerBackend(       scheduler.asInstanceOf[TaskSchedulerImpl],       sc,       kubernetesClient,       schedulerExecutorService,       snapshotsStore,       executorPodsAllocator,       executorPodsLifecycleEventHandler,       podsWatchEventSource,       podsPollingEventSource) }
    
  private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,   snapshotsStore: ExecutorPodsSnapshotsStore) = {           ...     cstr.newInstance(       sc.conf,       sc.env.securityManager,              new KubernetesExecutorBuilder(),       kubernetesClient,       snapshotsStore,       new SystemClock()) }
   |